home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- '''SPF (Sender-Permitted From) implementation.
-
- Copyright (c) 2003, Terence Way
- This module is free software, and you may redistribute it and/or modify
- it under the same terms as Python itself, so long as this copyright message
- and disclaimer are retained in their original form.
-
- IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
- SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
- THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
- DAMAGE.
-
- THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
- LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
- PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
- AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
- SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
-
- For more information about SPF, a tool against email forgery, see
- \thttp://spf.pobox.com
-
- For news, bugfixes, etc. visit the home page for this implementation at
- \thttp://www.wayforward.net/spf/
- '''
- __author__ = 'Terence Way'
- __email__ = 'terry@wayforward.net'
- __version__ = '1.6: December 18, 2003'
- MODULE = 'spf'
- USAGE = 'To check an incoming mail request:\n % python spf.py {ip} {sender} {helo}\n % python spf.py 69.55.226.139 tway@optsw.com mx1.wayforward.net\n\nTo test an SPF record:\n % python spf.py "v=spf1..." {ip} {sender} {helo}\n % python spf.py "v=spf1 +mx +ip4:10.0.0.1 -all" 10.0.0.1 tway@foo.com a \n\nTo fetch an SPF record:\n % python spf.py {domain}\n % python spf.py wayforward.net\n\nTo test this script (and to output this usage message):\n % python spf.py\n'
- import re
- import socket
- import struct
- import time
- import DNS
- MASK = 0xFFFFFFFFL
- RE_MODIFIER = re.compile('^([a-zA-Z]+)=')
- RE_CHAR = re.compile('%(%|_|-|(\\{[a-zA-Z][0-9]*r?[^\\}]*\\}))')
- RE_ARGS = re.compile('([0-9]*)(r?)([^0-9a-zA-Z]*)')
- JOINERS = {
- 'l': '.',
- 's': '.' }
- RESULTS = {
- '+': 'pass',
- '-': 'deny',
- '?': 'unknown',
- 'pass': 'pass',
- 'deny': 'deny',
- 'unknown': 'unknown' }
- EXPLANATIONS = {
- 'pass': 'sender SPF verified',
- 'deny': 'access denied',
- 'unknown': 'SPF unknown' }
-
- try:
- (bool, True, False) = (bool, True, False)
- except NameError:
- (False, True) = (0, 1)
-
- def bool(x):
- return not (not x)
-
-
-
- def check(i, s, h):
- """Test an incoming MAIL FROM:<s>, from a client with ip address i.
- \th is the HELO/EHLO domain name.
-
- \tReturns (result, mta-status-code, explanation) where result in
- \t['pass', 'unknown', 'deny', 'error'].
-
- \tExample:
- \t>>> check(i='127.0.0.1', s='terry@wayforward.net', h='localhost')
- \t('pass', 250, 'local connections always pass')
- \t"""
- if i.startswith('127.'):
- return ('pass', 250, 'local connections always pass')
-
-
- try:
- q = query(i = i, s = s, h = h)
- return q.check(q.dns_spf(q.d))
- except DNS.DNSError:
- return ('error', 450, 'SPF DNS Error')
-
-
-
- class query(object):
- """A query object keeps the relevant information about a single SPF
- \tquery:
-
- \ti: ip address of SMTP client
- \ts: sender declared in MAIL FROM:<>
- \tl: local part of sender s
- \td: current domain, initially domain part of sender s
- \th: EHLO/HELO domain
- \tv: 'in-addr' for IPv4 clients and 'ip6' for IPv6 clients
- \tt: current timestamp
- \tp: SMTP client domain name
- \to: domain part of sender s
-
- \tThis is also, by design, the same variables used in SPF macro
- \texpansion.
-
- \tAlso keeps cache: DNS cache.
- \t"""
-
- def __init__(self, i, s, h):
- self.i = i
- self.s = s
- self.h = h
- (self.l, self.o) = split_email(s, h)
- self.t = str(int(time.time()))
- self.v = 'in-addr'
- self.d = self.o
- self.p = None
- self.cache = { }
-
-
- def getp(self):
- if not self.p:
- p = self.dns_ptr(self.i)
- if len(p) > 0:
- self.p = p[0]
- else:
- self.p = self.i
-
- return self.p
-
-
- def check(self, spf):
- """
- \t\tReturns (result, mta-status-code, explanation) where
- \t\tresult in ['deny', 'unknown', 'pass']
- \t\t"""
- return self.check1(spf, self.d, 0)
-
-
- def check1(self, spf, domain, recursion):
- if recursion > 10:
- return ('unknown', 250, 'SPF recursion limit exceeded')
-
-
- try:
- tmp = self.d
- self.d = domain
- return self.check0(spf, recursion)
- finally:
- self.d = tmp
-
-
-
- def check0(self, spf, recursion):
- """Test this query information against SPF text.
-
- \t\tReturns (result, mta-status-code, explanation) where
- \t\tresult in ['deny', 'unknown', 'pass']
- \t\t"""
- if not spf:
- return ('unknown', 250, 'no SPF record')
-
- spf = spf.split()[1:]
- exps = dict(EXPLANATIONS)
- redirect = None
- default = 'unknown'
- for m in spf:
- m = RE_MODIFIER.split(m)[1:]
- if len(m) != 2:
- continue
-
- if m[0] == 'exp':
- exps['deny'] = exps['unknown'] = self.get_explanation(m[1])
- continue
- if m[0] == 'redirect':
- redirect = self.expand(m[1])
- continue
- if m[0] == 'default':
- default = RESULTS.get(m[1], default)
- continue
-
- for m in spf:
- if RE_MODIFIER.match(m):
- continue
-
- (m, arg, cidrlength) = parse_mechanism(m, self.d)
- result = RESULTS.get(m[0])
- if result:
- m = m[1:]
- else:
- result = 'pass'
- if m in [
- 'a',
- 'mx',
- 'ptr',
- 'exists',
- 'include']:
- arg = self.expand(arg)
-
- if m == 'include':
- if arg != self.d:
- tmp = self.check1(self.dns_spf(arg), arg, recursion + 1)
- if tmp[0] == 'pass':
- break
-
- if tmp[0] != 'fail':
- return tmp
-
-
- arg != self.d
- if m == 'all':
- break
- continue
- if m == 'exists':
- if len(self.dns_a(arg)) > 0:
- break
-
- len(self.dns_a(arg)) > 0
- if m == 'a':
- if cidrmatch(self.i, self.dns_a(arg), cidrlength):
- break
-
- cidrmatch(self.i, self.dns_a(arg), cidrlength)
- if m == 'mx':
- if cidrmatch(self.i, self.dns_mx(arg), cidrlength):
- break
-
- cidrmatch(self.i, self.dns_mx(arg), cidrlength)
- if m == 'ip4' and arg != self.d:
- if cidrmatch(self.i, [
- arg], cidrlength):
- break
-
- cidrmatch(self.i, [
- arg], cidrlength)
- if m == 'ptr':
- if domainmatch(self.validated_ptrs(self.i), arg):
- break
-
- domainmatch(self.validated_ptrs(self.i), arg)
- result = 'unknown'
- elif redirect:
- return self.check1(self.dns_spf(redirect), redirect, recursion + 1)
- else:
- result = default
- if result == 'deny':
- return (result, 550, exps[result])
- else:
- return (result, 250, exps[result])
-
-
- def get_explanation(self, spec):
- '''Expand an explanation.'''
- return self.expand(''.join(self.dns_txt(self.expand(spec))))
-
-
- def expand(self, str):
- """Do SPF RFC macro expansion.
-
- \t\tExamples:
- \t\t>>> q = query(s='strong-bad@email.example.com',
- \t\t... h='mx.example.org', i='192.0.2.3')
- \t\t>>> q.p = 'mx.example.org'
-
- \t\t>>> q.expand('%{d}')
- \t\t'email.example.com'
-
- \t\t>>> q.expand('%{d4}')
- \t\t'email.example.com'
-
- \t\t>>> q.expand('%{d3}')
- \t\t'email.example.com'
-
- \t\t>>> q.expand('%{d2}')
- \t\t'example.com'
-
- \t\t>>> q.expand('%{d1}')
- \t\t'com'
-
- \t\t>>> q.expand('%{p}')
- \t\t'mx.example.org'
-
- \t\t>>> q.expand('%{p2}')
- \t\t'example.org'
-
- \t\t>>> q.expand('%{dr}')
- \t\t'com.example.email'
- \t
- \t\t>>> q.expand('%{d2r}')
- \t\t'example.email'
-
- \t\t>>> q.expand('%{l}')
- \t\t'strong-bad'
-
- \t\t>>> q.expand('%{l-}')
- \t\t'strong.bad'
-
- \t\t>>> q.expand('%{lr}')
- \t\t'strong-bad'
-
- \t\t>>> q.expand('%{lr-}')
- \t\t'bad.strong'
-
- \t\t>>> q.expand('%{l1r-}')
- \t\t'strong'
-
- \t\t>>> q.expand('%{ir}.%{v}._spf.%{d2}')
- \t\t'3.2.0.192.in-addr._spf.example.com'
-
- \t\t>>> q.expand('%{lr-}.lp._spf.%{d2}')
- \t\t'bad.strong.lp._spf.example.com'
-
- \t\t>>> q.expand('%{lr-}.lp.%{ir}.%{v}._spf.%{d2}')
- \t\t'bad.strong.lp.3.2.0.192.in-addr._spf.example.com'
-
- \t\t>>> q.expand('%{ir}.%{v}.%{l1r-}.lp._spf.%{d2}')
- \t\t'3.2.0.192.in-addr.strong.lp._spf.example.com'
-
- \t\t>>> q.expand('%{p2}.trusted-domains.example.net')
- \t\t'example.org.trusted-domains.example.net'
-
- \t\t>>> q.expand('%{p2}.trusted-domains.example.net')
- \t\t'example.org.trusted-domains.example.net'
-
- \t\t"""
- end = 0
- result = ''
- for i in RE_CHAR.finditer(str):
- result += str[end:i.start()]
- macro = str[i.start():i.end()]
- if macro == '%%':
- result += '%'
- elif macro == '%_':
- result += ' '
- elif macro == '%-':
- result += '%20'
- else:
- letter = macro[2].lower()
- if letter == 'p':
- self.getp()
-
- expansion = getattr(self, letter, '')
- if expansion:
- result += expand_one(expansion, macro[3:-1], JOINERS.get(letter))
-
- end = i.end()
-
- return result + str[end:]
-
-
- def dns_spf(self, domain):
- '''Get the SPF record recorded in DNS for a specific domain
- \t\tname. Returns None if not found, or if more than one record
- \t\tis found.
- \t\t'''
- a = _[1]
-
-
- def dns_txt(self, domainname):
- return [ t for a in self.dns(domainname, 'TXT') for t in a ]
-
-
- def dns_mx(self, domainname):
- '''Get a list of IP addresses for all MX exchanges for a
- \t\tdomain name.
- \t\t'''
- return [ a for mx in self.dns(domainname, 'MX') for a in self.dns_a(mx[1]) ]
-
-
- def dns_a(self, domainname):
- '''Get a list of IP addresses for a domainname.'''
- return self.dns(domainname, 'A')
-
-
- def dns_aaaa(self, domainname):
- '''Get a list of IPv6 addresses for a domainname.'''
- return self.dns(domainname, 'AAAA')
-
-
- def validated_ptrs(self, i):
- '''Figure out the validated PTR domain names for a given IP
- \t\taddress.
- \t\t'''
- return _[1]
-
-
- def dns_ptr(self, i):
- '''Get a list of domain names for an IP address.'''
- return self.dns(reverse_dots(i) + '.in-addr.arpa', 'PTR')
-
-
- def dns(self, name, qtype):
- """DNS query.
-
- \t\tIf the result is in cache, return that. Otherwise pull the
- \t\tresult from DNS, and cache ALL answers, so additional info
- \t\tis available for further queries later.
-
- \t\tCNAMEs are followed.
-
- \t\tIf there is no data, [] is returned.
-
- \t\tpre: qtype in ['A', 'AAAA', 'MX', 'PTR', 'TXT', 'SPF']
- \t\tpost: isinstance(__return__, types.ListType)
- \t\t"""
- result = self.cache.get((name, qtype))
- cname = None
- if not result:
- req = DNS.DnsRequest(name, qtype = qtype)
- resp = req.req()
- for a in resp.answers:
- k = (a['name'], a['typename'])
- v = a['data']
- if k == (name, 'CNAME'):
- cname = v
-
- self.cache.setdefault(k, []).append(v)
-
- result = self.cache.get((name, qtype), [])
-
- if not result and cname:
- result = self.dns(cname, qtype)
-
- return result
-
-
-
- def split_email(s, h):
- """Given a sender email s and a HELO domain h, create a valid tuple
- \t(l, d) local-part and domain-part.
-
- \tExamples:
- \t>>> split_email('', 'wayforward.net')
- \t('postmaster', 'wayforward.net')
-
- \t>>> split_email('foo.com', 'wayforward.net')
- \t('postmaster', 'foo.com')
-
- \t>>> split_email('terry@wayforward.net', 'optsw.com')
- \t('terry', 'wayforward.net')
- \t"""
- if not s:
- return ('postmaster', h)
- else:
- parts = s.split('@', 1)
- if len(parts) == 2:
- return tuple(parts)
- else:
- return ('postmaster', s)
-
-
- def parse_mechanism(str, d):
- """Breaks A, MX, IP4, and PTR mechanisms into a (name, domain,
- \tcidr) tuple. The domain portion defaults to d if not present,
- \tthe cidr defaults to 32 if not present.
-
- \tExamples:
- \t>>> parse_mechanism('a', 'foo.com')
- \t('a', 'foo.com', 32)
-
- \t>>> parse_mechanism('a:bar.com', 'foo.com')
- \t('a', 'bar.com', 32)
-
- \t>>> parse_mechanism('a/24', 'foo.com')
- \t('a', 'foo.com', 24)
-
- \t>>> parse_mechanism('a:bar.com/16', 'foo.com')
- \t('a', 'bar.com', 16)
- \t"""
- a = str.split('/')
- if len(a) == 2:
- a = a[0]
- port = int(a[1])
- else:
- a = str
- port = 32
- b = a.split(':')
- if len(b) == 2:
- return (b[0], b[1], port)
- else:
- return (a, d, port)
-
-
- def reverse_dots(name):
- """Reverse dotted IP addresses or domain names.
-
- \tExample:
- \t>>> reverse_dots('192.168.0.145')
- \t'145.0.168.192'
-
- \t>>> reverse_dots('email.example.com')
- \t'com.example.email'
- \t"""
- a = name.split('.')
- a.reverse()
- return '.'.join(a)
-
-
- def domainmatch(ptrs, domainsuffix):
- """grep for a given domain suffix against a list of validated PTR
- \tdomain names.
-
- \tExamples:
- \t>>> domainmatch(['FOO.COM'], 'foo.com')
- \t1
-
- \t>>> domainmatch(['moo.foo.com'], 'FOO.COM')
- \t1
-
- \t>>> domainmatch(['moo.bar.com'], 'foo.com')
- \t0
-
- \t"""
- domainsuffix = domainsuffix.lower()
- for ptr in ptrs:
- ptr = ptr.lower()
- if ptr == domainsuffix or ptr.endswith('.' + domainsuffix):
- return True
- continue
-
- return False
-
-
- def cidrmatch(i, ipaddrs, cidr_length = 32):
- """Match an IP address against a list of other IP addresses.
-
- \tExamples:
- \t>>> cidrmatch('192.168.0.45', ['192.168.0.44', '192.168.0.45'])
- \t1
-
- \t>>> cidrmatch('192.168.0.43', ['192.168.0.44', '192.168.0.45'])
- \t0
-
- \t>>> cidrmatch('192.168.0.43', ['192.168.0.44', '192.168.0.45'], 24)
- \t1
- \t"""
- c = cidr(i, cidr_length)
- for ip in ipaddrs:
- if cidr(ip, cidr_length) == c:
- return True
- continue
-
- return False
-
-
- def cidr(i, n):
- """Convert an IP address string with a CIDR mask into a 32-bit
- \tinteger.
-
- \ti must be a string of numbers 0..255 separated by dots '.'::
- \tpre: forall([0 <= int(p) < 256 for p in i.split('.')])
-
- \tn is a number of bits to mask::
- \tpre: 0 <= n <= 32
-
- \tExamples:
- \t>>> bin2addr(cidr('192.168.5.45', 32))
- \t'192.168.5.45'
- \t>>> bin2addr(cidr('192.168.5.45', 24))
- \t'192.168.5.0'
- \t>>> bin2addr(cidr('192.168.0.45', 8))
- \t'192.0.0.0'
- \t"""
- return ~(MASK >> n) & MASK & addr2bin(i)
-
-
- def addr2bin(str):
- """Convert a string IPv4 address into an unsigned integer.
-
- \tExamples::
- \t>>> addr2bin('127.0.0.1')
- \t2130706433L
-
- \t>>> addr2bin('127.0.0.1') == socket.INADDR_LOOPBACK
- \t1
-
- \t>>> addr2bin('255.255.255.254')
- \t4294967294L
-
- \t>>> addr2bin('192.168.0.1')
- \t3232235521L
-
- \tUnlike DNS.addr2bin, the n, n.n, and n.n.n forms for IP addresses
- \tare handled as well::
- \t>>> addr2bin('10.65536')
- \t167837696L
- \t>>> 10 * (2 ** 24) + 65536
- \t167837696
-
- \t>>> addr2bin('10.93.512')
- \t173867520L
- \t>>> 10 * (2 ** 24) + 93 * (2 ** 16) + 512
- \t173867520
- \t"""
- return struct.unpack('!L', socket.inet_aton(str))[0]
-
-
- def bin2addr(addr):
- """Convert a numeric IPv4 address into string n.n.n.n form.
-
- \tExamples::
- \t>>> bin2addr(socket.INADDR_LOOPBACK)
- \t'127.0.0.1'
-
- \t>>> bin2addr(socket.INADDR_ANY)
- \t'0.0.0.0'
-
- \t>>> bin2addr(socket.INADDR_NONE)
- \t'255.255.255.255'
- \t"""
- return socket.inet_ntoa(struct.pack('!L', addr))
-
-
- def expand_one(expansion, str, joiner):
- if not str:
- return expansion
-
- (len, reverse, delimiters) = RE_ARGS.split(str)[1:4]
- if not delimiters:
- delimiters = '.'
-
- expansion = split(expansion, delimiters, joiner)
- if reverse:
- expansion.reverse()
-
- if len:
- expansion = expansion[-int(len) * 2 + 1:]
-
- return ''.join(expansion)
-
-
- def split(str, delimiters, joiner = None):
- """Split a string into pieces by a set of delimiter characters. The
- \tresulting list is delimited by joiner, or the original delimiter if
- \tjoiner is not specified.
-
- \tExamples:
- \t>>> split('192.168.0.45', '.')
- \t['192', '.', '168', '.', '0', '.', '45']
-
- \t>>> split('terry@wayforward.net', '@.')
- \t['terry', '@', 'wayforward', '.', 'net']
-
- \t>>> split('terry@wayforward.net', '@.', '.')
- \t['terry', '.', 'wayforward', '.', 'net']
- \t"""
- result = []
- element = ''
- for c in str:
- if c in delimiters:
- result.append(element)
- element = ''
- if joiner:
- result.append(joiner)
- else:
- result.append(c)
- joiner
- element += c
-
- result.append(element)
- return result
-
-
- def _test():
- import doctest
- import spf
- return doctest.testmod(spf)
-
- DNS.DiscoverNameServers()
- if __name__ == '__main__':
- import sys
- if len(sys.argv) == 1:
- print USAGE
- _test()
- elif len(sys.argv) == 2:
- q = query(i = '127.0.0.1', s = 'localhost', h = 'unknown')
- print q.dns_spf(sys.argv[1])
- elif len(sys.argv) == 4:
- print check(i = sys.argv[1], s = sys.argv[2], h = sys.argv[3])
- elif len(sys.argv) == 5:
- (i, s, h) = sys.argv[2:]
- q = query(i = i, s = s, h = h)
- print q.check(sys.argv[1])
- else:
- print USAGE
-
-